// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_catalog::plan::InvertedIndexInfo;
use databend_common_catalog::plan::PushDownInfo;
use databend_common_exception::Result;
use databend_common_expression::types::F32;
use opendal::Operator;
use tantivy::query::QueryParser;
use tantivy::schema::Field;
use tantivy::tokenizer::TokenizerManager;
use tantivy::Score;

use crate::io::create_index_schema;
use crate::io::create_tokenizer_manager;
use crate::io::read::InvertedIndexReader;
use crate::io::TableMetaLocationGenerator;

// Each block file has a corresponding index file,
// the data in the index file is generated by tantivy.
// Index searcher return matched docIds and scores for query text.
// If no docId is matched, the corresponding block can be pruned.
//
// ┌────────┐     ┌────────┐   ┌────────┐     ┌────────┐
// │ Index1 │ ... │ IndexM │   │ IndexN │ ... │ IndexZ │
// └────────┘     └────────┘   └────────┘     └────────┘
//     |              |            |              |
//     |              |            |              |
// ┌────────┐     ┌────────┐   ┌────────┐     ┌────────┐
// │ Block1 │ ... │ BlockM │   │ BlockN │ ... │ BlockZ │
// └────────┘     └────────┘   └────────┘     └────────┘
//  \                     /     \                     /
//   \          _________/       \          _________/
//    \        /                  \        /
//     Segment1           ...      SegmentN
//
pub struct InvertedIndexPruner {
    dal: Operator,
    field_nums: usize,
    has_score: bool,
    need_position: bool,
    query_fields: Vec<Field>,
    query_field_boosts: Vec<(Field, Score)>,
    tokenizer_manager: TokenizerManager,
    inverted_index_info: InvertedIndexInfo,
}

impl InvertedIndexPruner {
    pub fn try_create(
        dal: Operator,
        push_down: &Option<PushDownInfo>,
    ) -> Result<Option<Arc<InvertedIndexPruner>>> {
        let inverted_index_info = push_down.as_ref().and_then(|p| p.inverted_index.as_ref());
        if let Some(inverted_index_info) = inverted_index_info {
            // collect query fields and optional boosts
            let mut query_fields = Vec::with_capacity(inverted_index_info.query_fields.len());
            let mut query_field_boosts = Vec::with_capacity(inverted_index_info.query_fields.len());
            for (field_name, boost) in &inverted_index_info.query_fields {
                let i = inverted_index_info.index_schema.index_of(field_name)?;
                let field = Field::from_field_id(i as u32);
                query_fields.push(field);
                if let Some(boost) = boost {
                    query_field_boosts.push((field, boost.0));
                }
            }

            // parse query text to check whether has phrase terms need position file.
            let (index_schema, index_fields) = create_index_schema(
                Arc::new(inverted_index_info.index_schema.clone()),
                &inverted_index_info.index_options,
            )?;
            let tokenizer_manager = create_tokenizer_manager(&inverted_index_info.index_options);
            let query_parser =
                QueryParser::new(index_schema, index_fields, tokenizer_manager.clone());
            let query = query_parser.parse_query(&inverted_index_info.query_text)?;
            let mut need_position = false;
            query.query_terms(&mut |_, pos| {
                if pos {
                    need_position = true;
                }
            });
            // whether need to generate score internl column
            let has_score = inverted_index_info.has_score;
            let field_nums = inverted_index_info.index_schema.num_fields();

            return Ok(Some(Arc::new(InvertedIndexPruner {
                dal,
                field_nums,
                has_score,
                need_position,
                query_fields,
                query_field_boosts,
                tokenizer_manager,
                inverted_index_info: inverted_index_info.clone(),
            })));
        }
        Ok(None)
    }

    #[async_backtrace::framed]
    pub async fn should_keep(
        &self,
        block_loc: &str,
        row_count: u64,
    ) -> Result<Option<Vec<(usize, Option<F32>)>>> {
        let index_loc = TableMetaLocationGenerator::gen_inverted_index_location_from_block_location(
            block_loc,
            &self.inverted_index_info.index_name,
            &self.inverted_index_info.index_version,
        );

        let inverted_index_reader = InvertedIndexReader::try_create(
            self.dal.clone(),
            self.field_nums,
            self.has_score,
            self.need_position,
            self.query_fields.clone(),
            self.query_field_boosts.clone(),
            self.tokenizer_manager.clone(),
            &index_loc,
        )
        .await?;

        let matched_rows =
            inverted_index_reader.do_filter(&self.inverted_index_info.query_text, row_count)?;

        Ok(matched_rows)
    }
}
